#!/usr/bin/env bash
usage() {
    cat <<EOF
Usage: stream-admin create|update <command> <args...>
where command is one of:
    all                 Run all source, sinks and agent.
    source              Run emqx source
    func-distribute     Run the distribute function
    agent               Run the rule agent
    rules               Update all running rules
    sink-db             Run the db sink
    sink-webhook        Run the webhook sink
    sink-mail           Run the mail sink
    sink-mqtt           Run the mqtt sink
    sink-publish        Run the publish sink
    sink-ac-db          Run the actorcloud db sink
where argument is one of:
    -force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdown
EOF
}
# Bash file to setup the pulsar stream environment
StreamPath=$(cd $(dirname $0);pwd)
PulsarHome=$(dirname "$StreamPath")
PulsarAdmin="${PulsarHome}/bin/pulsar-admin"
StreamVersion="0.5.3"
Tenant="public"
Namespace="default"
ActionPrefix="__acaction"
Parallelism=1

if [ $# -lt 1 ]
then
    echo "Error: no enough arguments provided."
    usage
    exit 1
fi

createUpdate=$1
shift
command=$1
shift

echo "doing $command ..."


if [ "$command" = "all" ] || [ "$command" = "source" ]; then
    if [ "$1" == "-force" ] || [ "$createUpdate" == "create" ]; then
        echo "Creating source"
        ${PulsarAdmin} functions delete --fqfn ${Tenant}/${Namespace}/__source_emqx_all
        ${PulsarAdmin} source create --className io.emqx.pulsar.io.EMQXSource --archive ${StreamPath}/emqx-source-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __source_emqx_all --destinationTopicName persistent://${Tenant}/${Namespace}/__emqx_all --source-config-file ${StreamPath}/emqx-config.yml --parallelism ${Parallelism}
    else
        echo "Updating source"
        ${PulsarAdmin} source update --className io.emqx.pulsar.io.EMQXSource --archive ${StreamPath}/emqx-source-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __source_emqx_all --destinationTopicName persistent://${Tenant}/${Namespace}/__emqx_all --source-config-file ${StreamPath}/emqx-config.yml --parallelism ${Parallelism}
    fi
fi

if [ "$command" = "all" ] || [ "$command" = "func-distribute" ]; then
    echo "Creating distribute function"
    if [ "$1" == "-force" ] || [ "$createUpdate" == "create" ]; then
        ${PulsarAdmin} functions delete --fqfn ${Tenant}/${Namespace}/__function_distribute
        ${PulsarAdmin} functions create --fqfn ${Tenant}/${Namespace}/__function_distribute --className io.emqx.pulsar.functions.TopicDistributeFunction --jar ${StreamPath}/topic-distribute-function-${StreamVersion}-jar-with-dependencies.jar --inputs persistent://${Tenant}/${Namespace}/__emqx_all --parallelism ${Parallelism}
    else
        ${PulsarAdmin} functions update --fqfn ${Tenant}/${Namespace}/__function_distribute --className io.emqx.pulsar.functions.TopicDistributeFunction --jar ${StreamPath}/topic-distribute-function-${StreamVersion}-jar-with-dependencies.jar --inputs persistent://${Tenant}/${Namespace}/__emqx_all --parallelism ${Parallelism}
    fi
fi

if [ "$command" = "all" ] || [ "$command" = "agent" ]; then
    echo "Creating stream agent"
    # Env for agent service
    echo "RULE_ENGINE_PATH=${StreamPath}" > /etc/systemd/system/agent.conf
    service agent stop || /bin/cp ${StreamPath}/agent.service /etc/systemd/system/agent.service && service agent start
fi

if [ "$command" = "all" ] || [ "$command" = "sink-db" ]; then
    echo "Creating Database Sink"
    if [ "$1" == "-force" ] || [ "$createUpdate" == "create" ]; then
        ${PulsarAdmin} functions delete --fqfn ${Tenant}/${Namespace}/__sink_db
        ${PulsarAdmin} sink create --className io.emqx.pulsar.io.DatabaseSink --archive ${StreamPath}/db-sink-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_db --inputs persistent://${Tenant}/${Namespace}/${ActionPrefix}_db --sink-config-file ${StreamPath}/db-sink-config.yml
    else
        ${PulsarAdmin} sink update --className io.emqx.pulsar.io.DatabaseSink --archive ${StreamPath}/db-sink-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_db --inputs persistent://${Tenant}/${Namespace}/${ActionPrefix}_db --sink-config-file ${StreamPath}/db-sink-config.yml
    fi
fi

if [ "$command" = "all" ] || [ "$command" = "sink-webhook" ]; then
    echo "Creating Webhook Sink"
    if [ "$1" == "-force" ] || [ "$createUpdate" == "create" ]; then
        ${PulsarAdmin} functions delete --fqfn ${Tenant}/${Namespace}/__sink_webhook
        ${PulsarAdmin} sink create --className io.emqx.pulsar.io.WebhookSink --archive ${StreamPath}/webhook-sink-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_webhook --inputs persistent://${Tenant}/${Namespace}/${ActionPrefix}_webhook
    else
        ${PulsarAdmin} sink update --className io.emqx.pulsar.io.WebhookSink --archive ${StreamPath}/webhook-sink-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_webhook --inputs persistent://${Tenant}/${Namespace}/${ActionPrefix}_webhook
    fi
fi

if [ "$command" = "all" ] || [ "$command" = "sink-mail" ]; then
    echo "Creating Mail Sink"
    if [ "$1" == "-force" ] || [ "$createUpdate" == "create" ]; then
        ${PulsarAdmin} functions delete --fqfn ${Tenant}/${Namespace}/__sink_mail
        ${PulsarAdmin} sink create --className io.emqx.pulsar.io.MailSink --archive ${StreamPath}/mail-sink-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_mail --inputs persistent://${Tenant}/${Namespace}/${ActionPrefix}_mail --sink-config-file ${StreamPath}/mail-sink-config.yml
    else
        ${PulsarAdmin} sink update --className io.emqx.pulsar.io.MailSink --archive ${StreamPath}/mail-sink-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_mail --inputs persistent://${Tenant}/${Namespace}/${ActionPrefix}_mail --sink-config-file ${StreamPath}/mail-sink-config.yml
    fi
fi

if [ "$command" = "all" ] || [ "$command" = "sink-mqtt" ]; then
    echo "Creating MQTT Sink"
    if [ "$1" == "-force" ] || [ "$createUpdate" == "create" ]; then
        ${PulsarAdmin} functions delete --fqfn ${Tenant}/${Namespace}/__sink_mqtt
        ${PulsarAdmin} sink create --className io.emqx.pulsar.io.EMQXSink --archive ${StreamPath}/emqx-source-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_mqtt --inputs persistent://${Tenant}/${Namespace}/${ActionPrefix}_mqtt --sink-config-file ${StreamPath}/emqx-config.yml
    else
        ${PulsarAdmin} sink update --className io.emqx.pulsar.io.EMQXSink --archive ${StreamPath}/emqx-source-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_mqtt --inputs persistent://${Tenant}/${Namespace}/${ActionPrefix}_mqtt --sink-config-file ${StreamPath}/emqx-config.yml
    fi
fi

if [ "$command" = "all" ] || [ "$command" = "sink-publish" ]; then
    echo "Creating Publish Sink"
    if [ "$1" == "-force" ] || [ "$createUpdate" == "create" ]; then
        ${PulsarAdmin} functions delete --fqfn ${Tenant}/${Namespace}/__sink_publish
        ${PulsarAdmin} sink create --className io.emqx.pulsar.io.PublishSink --archive ${StreamPath}/publish-sink-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_publish --inputs persistent://${Tenant}/${Namespace}/${ActionPrefix}_publish --sink-config-file ${StreamPath}/publish-sink-config.yml
    else
        ${PulsarAdmin} sink update --className io.emqx.pulsar.io.PublishSink --archive ${StreamPath}/publish-sink-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_publish --inputs persistent://${Tenant}/${Namespace}/${ActionPrefix}_publish --sink-config-file ${StreamPath}/publish-sink-config.yml
    fi
fi


if [ "$command" = "all" ] || [ "$command" = "sink-ac-db" ]; then
    echo "Creating Actorcloud Database Sink"
    if [ "$1" == "-force" ] || [ "$createUpdate" == "create" ]; then
        ${PulsarAdmin} functions delete --fqfn ${Tenant}/${Namespace}/__sink_ac_db
        ${PulsarAdmin} sink create --className io.emqx.pulsar.io.ActorcloudSink --archive ${StreamPath}/actorcloud-db-sink-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_ac_db --inputs persistent://${Tenant}/${Namespace}/__emqx_all --sink-config-file ${StreamPath}/actorcloud-db-sink-config.yml
    else
        ${PulsarAdmin} sink update --className io.emqx.pulsar.io.ActorcloudSink --archive ${StreamPath}/actorcloud-db-sink-${StreamVersion}.nar --tenant ${Tenant} --namespace ${Namespace} --name __sink_ac_db --inputs persistent://${Tenant}/${Namespace}/__emqx_all --sink-config-file ${StreamPath}/actorcloud-db-sink-config.yml
    fi
fi

if [ "$command" = "rules" ]; then
    echo "Update existing rules"
    output=$(${PulsarAdmin} functions list)
    while read -r line; do
        if [[ ${line} == __rule_* ]]; then
            ${PulsarAdmin} functions restart --fqfn ${Tenant}/${Namespace}/${line}
        fi
    done <<< "$output"
fi

echo "Setup done successfully!"
